#!/usr/bin/env python
#
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

# A script which builds, flashes, and runs EC CTS
#
# Software prerequisites:
# - openocd version 0.10 or above
# - lsusb
# - udevadm
#
# To try it out, hook two boards (DEFAULT_TH and DEFAULT_DUT) with USB cables
# to the host and execute the script:
#   $ ./cts.py
# It'll run mock tests. The result will be stored in CTS_TEST_RESULT_DIR.

# Note: This is a py2/3 compatible file.

from __future__ import print_function

import argparse
import os
import shutil
import time
import common.board as board


CTS_RC_PREFIX = 'CTS_RC_'
DEFAULT_TH = 'stm32l476g-eval'
DEFAULT_DUT = 'nucleo-f072rb'
MAX_SUITE_TIME_SEC = 5
CTS_TEST_RESULT_DIR = '/tmp/ects'

# Host only return codes. Make sure they match values in cts.rc
CTS_RC_DID_NOT_START = -1          # test did not run.
CTS_RC_DID_NOT_END = -2            # test did not run.
CTS_RC_DUPLICATE_RUN = -3          # test was run multiple times.
CTS_RC_INVALID_RETURN_CODE = -4    # failed to parse return code


class Cts(object):
  """Class that represents a eCTS run.

  Attributes:
    dut: DeviceUnderTest object representing DUT
    th: TestHarness object representing a test harness
    module: Name of module to build/run tests for
    testlist: List of strings of test names contained in given module
    return_codes: Dict of strings of return codes, with a code's integer
      value being the index for the corresponding string representation
  """

  def __init__(self, ec_dir, th, dut, module):
    """Initializes cts class object with given arguments.

    Args:
      ec_dir: Path to ec directory
      th: Name of the test harness board
      dut: Name of the device under test board
      module: Name of module to build/run tests for (e.g. gpio, interrupt)
    """
    self.results_dir = os.path.join(CTS_TEST_RESULT_DIR, dut, module)
    if os.path.isdir(self.results_dir):
      shutil.rmtree(self.results_dir)
    else:
      os.makedirs(self.results_dir)
    self.ec_dir = ec_dir
    self.module = module
    serial_path = os.path.join(CTS_TEST_RESULT_DIR, 'th_serial')
    self.th = board.TestHarness(th, module, self.results_dir, serial_path)
    self.dut = board.DeviceUnderTest(dut, self.th, module, self.results_dir)
    cts_dir = os.path.join(self.ec_dir, 'cts')
    testlist_path = os.path.join(cts_dir, self.module, 'cts.testlist')
    return_codes_path = os.path.join(cts_dir, 'common', 'cts.rc')
    self.get_return_codes(return_codes_path)
    self.testlist = self.get_macro_args(testlist_path, 'CTS_TEST')

  def build(self):
    """Build images for DUT and TH."""
    print('Building DUT image...')
    if not self.dut.build(self.ec_dir):
      raise RuntimeError('Building module %s for DUT failed' % (self.module))
    print('Building TH image...')
    if not self.th.build(self.ec_dir):
      raise RuntimeError('Building module %s for TH failed' % (self.module))

  def flash_boards(self):
    """Flashes TH and DUT with their most recently built ec.bin."""
    cts_module = 'cts_' + self.module
    image_path = os.path.join('build', self.th.board, cts_module, 'ec.bin')
    self.identify_boards()
    print('Flashing TH with', image_path)
    if not self.th.flash(image_path):
      raise RuntimeError('Flashing TH failed')
    image_path = os.path.join('build', self.dut.board, cts_module, 'ec.bin')
    print('Flashing DUT with', image_path)
    if not self.dut.flash(image_path):
      raise RuntimeError('Flashing DUT failed')

  def setup(self):
    """Setup boards."""
    self.th.save_serial()

  def identify_boards(self):
    """Updates serials of TH and DUT in that order (order matters)."""
    self.th.get_serial()
    self.dut.get_serial()

  def get_macro_args(self, filepath, macro):
    """Get list of args of a macro in a file when macro.

    Args:
      filepath: String containing absolute path to the file
      macro: String containing text of macro to get args of

    Returns:
      List of dictionaries where each entry is:
        'name': Test name,
        'th_string': Expected string from TH,
        'dut_string': Expected string from DUT,
    """
    tests = []
    with open(filepath, 'r') as f:
      lines = f.readlines()
      joined = ''.join(lines).replace('\\\n', '').splitlines()
      for l in joined:
        if not l.strip().startswith(macro):
          continue
        d = {}
        l = l.strip()[len(macro):]
        l = l.strip('()').split(',')
        d['name'] = l[0].strip()
        d['th_rc'] = self.get_return_code_value(l[1].strip().strip('"'))
        d['th_string'] = l[2].strip().strip('"')
        d['dut_rc'] = self.get_return_code_value(l[3].strip().strip('"'))
        d['dut_string'] = l[4].strip().strip('"')
        tests.append(d)
    return tests

  def get_return_codes(self, filepath):
    """Read return code names from the return code definition file."""
    self.return_codes = {}
    val = 0
    with open(filepath, 'r') as f:
      for line in f:
        line = line.strip()
        if not line.startswith(CTS_RC_PREFIX):
          continue
        line = line.split(',')[0]
        if '=' in line:
          tokens = line.split('=')
          line = tokens[0].strip()
          val = int(tokens[1].strip())
        self.return_codes[line] = val
        val += 1

  def parse_output(self, output):
    """Parse console output from DUT or TH.

    Args:
      output: String containing consoule output

    Returns:
      List of dictionaries where each key and value are:
        name = 'ects_test_x',
        started = True/False,
        ended = True/False,
        rc = CTS_RC_*,
        output = All text between 'ects_test_x start' and 'ects_test_x end'
    """
    results = []
    i = 0
    for test in self.testlist:
      results.append({})
      results[i]['name'] = test['name']
      results[i]['started'] = False
      results[i]['rc'] = CTS_RC_DID_NOT_START
      results[i]['string'] = False
      results[i]['output'] = []
      i += 1

    i = 0
    for ln in [ln.strip() for ln in output.split('\n')]:
      if i + 1 > len(results):
        break
      tokens = ln.split()
      if len(tokens) >= 2:
        if tokens[0].strip() == results[i]['name']:
          if tokens[1].strip() == 'start':
            # start line found
            if results[i]['started']:   # Already started
              results[i]['rc'] = CTS_RC_DUPLICATE_RUN
            else:
              results[i]['rc'] = CTS_RC_DID_NOT_END
              results[i]['started'] = True
            continue
          elif results[i]['started'] and tokens[1].strip() == 'end':
            # end line found
            results[i]['rc'] = CTS_RC_INVALID_RETURN_CODE
            if len(tokens) == 3:
              try:
                results[i]['rc'] = int(tokens[2].strip())
              except ValueError:
                pass
            # Since index is incremented when 'end' is encountered, we don't
            # need to check duplicate 'end'.
            i += 1
            continue
      if results[i]['started']:
        results[i]['output'].append(ln)

    return results

  def get_return_code_name(self, code, strip_prefix=False):
    name = ''
    for k, v in self.return_codes.items():
      if v == code:
        if strip_prefix:
          name = k[len(CTS_RC_PREFIX):]
        else:
          name = k
    return name

  def get_return_code_value(self, name):
    if name:
      return self.return_codes[name]
    return  0

  def evaluate_run(self, dut_output, th_output):
    """Parse outputs to derive test results.

    Args:
      dut_output: String output of DUT
      th_output: String output of TH

    Returns:
      th_results: list of test results for TH
      dut_results: list of test results for DUT
    """
    th_results = self.parse_output(th_output)
    dut_results = self.parse_output(dut_output)

    # Search for expected string in each output
    for i, v in enumerate(self.testlist):
      if v['th_string'] in th_results[i]['output'] or not v['th_string']:
        th_results[i]['string'] = True
      if v['dut_string'] in dut_results[i]['output'] or not v['dut_string']:
        dut_results[i]['string'] = True

    return th_results, dut_results

  def print_result(self, th_results, dut_results):
    """Print results to the screen.

    Args:
      th_results: list of test results for TH
      dut_results: list of test results for DUT
    """
    len_test_name = max(len(s['name']) for s in self.testlist)
    len_code_name = max(len(self.get_return_code_name(v, True))
                        for v in self.return_codes.values())

    head = '{:^' + str(len_test_name) + '} '
    head += '{:^' + str(len_code_name) + '} '
    head += '{:^' + str(len_code_name) + '}'
    head += '{:^' + str(len(' TH_STR')) + '}'
    head += '{:^' + str(len(' DUT_STR')) + '}'
    head += '{:^' + str(len(' RESULT')) + '}\n'
    fmt = '{:' + str(len_test_name) + '} '
    fmt += '{:>' + str(len_code_name) + '} '
    fmt += '{:>' + str(len_code_name) + '}'
    fmt += '{:>' + str(len(' TH_STR')) + '}'
    fmt += '{:>' + str(len(' DUT_STR')) + '}'
    fmt += '{:>' + str(len(' RESULT')) + '}\n'

    self.formatted_results = head.format(
        'TEST NAME', 'TH_RC', 'DUT_RC',
        ' TH_STR', ' DUT_STR', ' RESULT')
    for i, d in enumerate(dut_results):
      th_cn = self.get_return_code_name(th_results[i]['rc'], True)
      dut_cn = self.get_return_code_name(dut_results[i]['rc'], True)
      th_res = self.evaluate_result(th_results[i],
                                    self.testlist[i]['th_rc'],
                                    self.testlist[i]['th_string'])
      dut_res = self.evaluate_result(dut_results[i],
                                     self.testlist[i]['dut_rc'],
                                     self.testlist[i]['dut_string'])
      self.formatted_results += fmt.format(
          d['name'], th_cn, dut_cn,
          'YES' if th_results[i]['string'] else 'NO',
          'YES' if dut_results[i]['string'] else 'NO',
          'PASS' if th_res and dut_res else 'FAIL')

  def evaluate_result(self, result, expected_rc, expected_string):
    if result['rc'] != expected_rc:
      return False
    if expected_string and expected_string not in result['output']:
      return False
    return True

  def run(self):
    """Resets boards, records test results in results dir."""
    print('Reading serials...')
    self.identify_boards()
    print('Opening DUT tty...')
    self.dut.setup_tty()
    print('Opening TH tty...')
    self.th.setup_tty()

    # Boards might be still writing to tty. Wait a few seconds before flashing.
    time.sleep(3)

    # clear buffers
    print('Clearing DUT tty...')
    self.dut.read_tty()
    print('Clearing TH tty...')
    self.th.read_tty()

    # Resets the boards and allows them to run tests
    # Due to current (7/27/16) version of sync function,
    # both boards must be rest and halted, with the th
    # resuming first, in order for the test suite to run in sync
    print('Halting TH...')
    if not self.th.reset_halt():
      raise RuntimeError('Failed to halt TH')
    print('Halting DUT...')
    if not self.dut.reset_halt():
      raise RuntimeError('Failed to halt DUT')
    print('Resuming TH...')
    if not self.th.resume():
      raise RuntimeError('Failed to resume TH')
    print('Resuming DUT...')
    if not self.dut.resume():
      raise RuntimeError('Failed to resume DUT')

    time.sleep(MAX_SUITE_TIME_SEC)

    print('Reading DUT tty...')
    dut_output, _ = self.dut.read_tty()
    self.dut.close_tty()
    print('Reading TH tty...')
    th_output, _ = self.th.read_tty()
    self.th.close_tty()

    print('Halting TH...')
    if not self.th.reset_halt():
      raise RuntimeError('Failed to halt TH')
    print('Halting DUT...')
    if not self.dut.reset_halt():
      raise RuntimeError('Failed to halt DUT')

    if not dut_output or not th_output:
      raise ValueError('Output missing from boards. If you have a process '
                       'reading ttyACMx, please kill that process and try '
                       'again.')

      print('Pursing results...')
    th_results, dut_results = self.evaluate_run(dut_output, th_output)

    # Print out results
    self.print_result(th_results, dut_results)

    # Write results
    dest = os.path.join(self.results_dir, 'results.log')
    with open(dest, 'w') as fl:
      fl.write(self.formatted_results)

    # Write UART outputs
    dest = os.path.join(self.results_dir, 'uart_th.log')
    with open(dest, 'w') as fl:
      fl.write(th_output)
    dest = os.path.join(self.results_dir, 'uart_dut.log')
    with open(dest, 'w') as fl:
      fl.write(dut_output)

    print(self.formatted_results)

    # TODO(chromium:735652): Should set exit code for the shell


def main():
  ec_dir = os.path.realpath(os.path.join(
      os.path.dirname(os.path.abspath(__file__)), '..'))
  os.chdir(ec_dir)

  dut = DEFAULT_DUT
  module = 'meta'

  parser = argparse.ArgumentParser(description='Used to build/flash boards')
  parser.add_argument('-d',
                      '--dut',
                      help='Specify DUT you want to build/flash')
  parser.add_argument('-m',
                      '--module',
                      help='Specify module you want to build/flash')
  parser.add_argument('-s',
                      '--setup',
                      action='store_true',
                      help='Connect only the TH to save its serial')
  parser.add_argument('-b',
                      '--build',
                      action='store_true',
                      help='Build test suite (no flashing)')
  parser.add_argument('-f',
                      '--flash',
                      action='store_true',
                      help='Flash boards with most recent images')
  parser.add_argument('-r',
                      '--run',
                      action='store_true',
                      help='Run tests without flashing')

  args = parser.parse_args()

  if args.module:
    module = args.module

  if args.dut:
    dut = args.dut

  cts = Cts(ec_dir, DEFAULT_TH, dut=dut, module=module)

  if args.setup:
    cts.setup()
  elif args.build:
    cts.build()
  elif args.flash:
    cts.flash_boards()
  elif args.run:
    cts.run()
  else:
    cts.build()
    cts.flash_boards()
    cts.run()

if __name__ == '__main__':
  main()
